package com.zrkizzy.common.mq;

import com.zrkizzy.common.mq.service.IConsumerService;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.aopalliance.aop.Advice;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.config.RetryInterceptorBuilder;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.retry.RejectAndDontRequeueRecoverer;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.retry.RetryCallback;
import org.springframework.retry.RetryContext;
import org.springframework.retry.RetryListener;
import org.springframework.retry.backoff.BackOffPolicy;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.policy.SimpleRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

import java.util.Objects;

/**
 * RabbitMQ监听器工厂
 *
 * @author zhangrongkang
 * @since 2023/10/24
 */
@Data
@Slf4j
@Builder
@AllArgsConstructor
public class RabbitListenerFactory implements FactoryBean<SimpleMessageListenerContainer> {

    /**
     * RabbitMQ连接工厂
     */
    private ConnectionFactory connectionFactory;

    /**
     * RabbitMQ操作管理器
     */
    private AmqpAdmin amqpAdmin;

    /**
     * 消息队列
     */
    private Queue queue;

    /**
     * 交换机
     */
    private Exchange exchange;

    /**
     * 消费者
     */
    private IConsumerService consumer;

    /**
     * 重试回调
     */
    private RabbitRetryListener rabbitRetryListener;

    /**
     * 最大重试次数
     */
    private final Integer maxAttempts = 5;

    /**
     * 是否自动确认
     */
    private Boolean autoAck;

    /**
     * 创建SimpleMessageListenerContainer逻辑
     *
     * @return SimpleMessageListenerContainer对象
     * @throws Exception 异常
     */
    @Override
    public SimpleMessageListenerContainer getObject() throws Exception {
        // 手动创建对象
        SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer();
        // MQ操作管理器
        simpleMessageListenerContainer.setAmqpAdmin(amqpAdmin);
        // 连接工厂
        simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
        simpleMessageListenerContainer.setQueues(queue);
        simpleMessageListenerContainer.setPrefetchCount(20);
        simpleMessageListenerContainer.setConcurrentConsumers(20);
        simpleMessageListenerContainer.setMaxConcurrentConsumers(100);
        simpleMessageListenerContainer.setDefaultRequeueRejected(Boolean.FALSE);
        // 重试逻辑
        simpleMessageListenerContainer.setAdviceChain(setRetryStrategy());
        simpleMessageListenerContainer.setAcknowledgeMode(autoAck ? AcknowledgeMode.AUTO : AcknowledgeMode.MANUAL);
        // 如果消费者不为空
        if (Objects.nonNull(consumer)) {
            // 设置消费者监听
            simpleMessageListenerContainer.setMessageListener(consumer);
        }
        return simpleMessageListenerContainer;
    }

    /**
     * 配置重试策略
     *
     * @return 重试策略
     */
    private Advice setRetryStrategy() {
        // 重试策略模板
        RetryTemplate retryTemplate = new RetryTemplate();
        retryTemplate.registerListener(new RetryListener() {
            @Override
            public <T, E extends Throwable> boolean open(RetryContext retryContext, RetryCallback<T, E> retryCallback) {
                // 第一次重试调用直接放行
                return true;
            }

            @Override
            public <T, E extends Throwable> void close(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {

            }

            @Override
            public <T, E extends Throwable> void onError(RetryContext retryContext, RetryCallback<T, E> retryCallback, Throwable throwable) {
                // 如果当前的重试回调不为空
                if (Objects.nonNull(rabbitRetryListener)) {
                    // 调用每一次重试回调逻辑
                    rabbitRetryListener.onceRetry(retryContext, retryCallback, throwable);
                    // 如果达到了最大重试次数
                    if (maxAttempts.equals(retryContext.getRetryCount())) {
                        // 调用最后一次重试方法
                        rabbitRetryListener.lastRetry(retryContext, retryCallback, throwable);
                    }
                }
            }
        });
        // 设置重试策略，最大重试多少次
        retryTemplate.setRetryPolicy(new SimpleRetryPolicy(maxAttempts));
        // 设置重试时间间隔策略
        retryTemplate.setBackOffPolicy(setExponentialBackOffPolicy());
        return RetryInterceptorBuilder.stateless()
                .retryOperations(retryTemplate)
                .recoverer(new RejectAndDontRequeueRecoverer())
                .build();
    }

    /**
     * 设置过期时间
     *
     * @return 回退策略
     */
    private BackOffPolicy setExponentialBackOffPolicy() {
        ExponentialBackOffPolicy exponentialBackOffPolicy = new ExponentialBackOffPolicy();

        // 重试间隔数（单位为毫秒）
        exponentialBackOffPolicy.setInitialInterval(3000);
        // 第一次到最后一次重试最大时间间隔（毫秒）
        exponentialBackOffPolicy.setMaxInterval(15000);
        // 重试指数
        exponentialBackOffPolicy.setMultiplier(1);

        return exponentialBackOffPolicy;
    }

    @Override
    public Class<?> getObjectType() {
        return SimpleMessageListenerContainer.class;
    }

}
